import contextvars
import dataclasses
import re
import unittest.mock
from abc import ABC, abstractmethod
from enum import auto
from typing import (
    IO,
    TYPE_CHECKING,
    Annotated,
    Any,
    ClassVar,
    Dict,
    List,
    Optional,
    Type,
    TypeVar,
    Union,
    runtime_checkable,
)

import pydantic
import pydantic_core
from cached_property import cached_property
from pydantic import BaseModel, ConfigDict, SecretStr, ValidationError, model_validator
from pydantic.fields import Field
from typing_extensions import Protocol, Self

from datahub.configuration._config_enum import ConfigEnum as ConfigEnum
from datahub.configuration.env_vars import get_debug
from datahub.masking.secret_registry import SecretRegistry, is_masking_enabled
from datahub.utilities.dedup_list import deduplicate_list

REDACT_KEYS = {
    "password",
    "token",
    "secret",
    "options",
    "sqlalchemy_uri",
}
REDACT_SUFFIXES = {
    "_password",
    "-password",
    "_secret",
    "-secret",
    "_token",
    "-token",
    "_key",
    "-key",
    "_key_id",
    "-key-id",
}


def _should_redact_key(key: Union[str, int]) -> bool:
    return isinstance(key, str) and (
        key in REDACT_KEYS or any(key.endswith(suffix) for suffix in REDACT_SUFFIXES)
    )


def _redact_value(value: Any) -> Any:
    if isinstance(value, str):
        # If it's just a variable reference, it's ok to show as-is.
        if value.startswith("$"):
            return value
        return "********"
    elif value is None:
        return None
    elif isinstance(value, bool):
        # We don't have any sensitive boolean fields.
        return value
    elif isinstance(value, list) and not value:
        # Empty states are fine.
        return []
    elif isinstance(value, dict) and not value:
        return {}
    else:
        return "********"


def redact_raw_config(obj: Any) -> Any:
    if isinstance(obj, dict):
        return {
            k: _redact_value(v) if _should_redact_key(k) else redact_raw_config(v)
            for k, v in obj.items()
        }
    elif isinstance(obj, list):
        return [redact_raw_config(v) for v in obj]
    else:
        return obj


if TYPE_CHECKING:
    AnyType = TypeVar("AnyType")
    HiddenFromDocs = Annotated[AnyType, ...]
else:
    HiddenFromDocs = pydantic.json_schema.SkipJsonSchema

LaxStr = Annotated[str, pydantic.BeforeValidator(lambda v: str(v))]

# Context variable to track if we're inside a nested ConfigModel construction
_inside_nested_config: contextvars.ContextVar[bool] = contextvars.ContextVar(
    "_inside_nested_config", default=False
)


@dataclasses.dataclass(frozen=True)
class SupportedSources:
    sources: List[str]

    def __get_pydantic_json_schema__(
        self,
        core_schema: pydantic_core.core_schema.CoreSchema,
        handler: pydantic.GetJsonSchemaHandler,
    ) -> pydantic.json_schema.JsonSchemaValue:
        json_schema = handler(core_schema)
        json_schema.setdefault("schema_extra", {})["supported_sources"] = self.sources
        return json_schema


def _config_model_schema_extra(schema: Dict[str, Any], model: Type[BaseModel]) -> None:
    # We use the custom "hidden_from_docs" attribute to hide fields from the
    # autogenerated docs.
    remove_fields = []
    for key, prop in schema.get("properties", {}).items():
        if prop.get("hidden_from_docs"):
            remove_fields.append(key)

    for key in remove_fields:
        del schema["properties"][key]


class ConfigModel(BaseModel):
    model_config = ConfigDict(
        extra="forbid",
        ignored_types=(cached_property,),
        json_schema_extra=_config_model_schema_extra,
        hide_input_in_errors=not get_debug(),
    )

    @model_validator(mode="wrap")
    @classmethod
    def _track_nesting_context(
        cls,
        data: Any,
        handler: pydantic.ValidatorFunctionWrapHandler,
        info: pydantic.ValidationInfo,
    ) -> Self:
        """
        Wrap validator that tracks nesting context for nested ConfigModel detection.

        Sets a context variable so nested ConfigModels know they're being constructed as fields.
        """
        # Set context for any nested models that will be created during field processing
        token = _inside_nested_config.set(True)
        try:
            # Process the model normally (this calls __init__ and all validators)
            instance = handler(data)
        finally:
            # Reset context after processing
            _inside_nested_config.reset(token)

        return instance

    @model_validator(mode="after")
    def _register_secret_fields(self) -> Self:
        """
        Register SecretStr fields with the secret masking registry.
        Recursively traverses nested ConfigModel instances to find all SecretStr fields.

        Only models that are constructed outside of Pydantic field processing will register secrets.
        This ensures we capture the full qualified paths for nested secrets.

        Performance: Uses batch registration for efficiency - single version
        increment instead of one per secret.
        """
        if not is_masking_enabled():
            return self

        # Only register if we're NOT inside another ConfigModel's field processing
        # This means we're a "root" model from the user's perspective
        if _inside_nested_config.get():
            return self

        # Collect all secrets recursively (including from nested models)
        secrets: Dict[str, str] = {}
        self._collect_secrets(secrets, prefix="")

        # Batch register all secrets in one operation
        if secrets:
            SecretRegistry.get_instance().register_secrets_batch(secrets)

        return self

    def _collect_secrets(self, secrets: Dict[str, str], prefix: str) -> None:
        """
        Recursively collect SecretStr fields from this model and nested ConfigModel instances.

        Args:
            secrets: Dictionary to populate with field_name -> secret_value mappings
            prefix: Prefix for nested field names (e.g., "azure_auth." for nested fields)
        """
        for field_name, _field_info in self.__class__.model_fields.items():
            field_value = getattr(self, field_name, None)

            if field_value is None:
                continue

            # Build the full field path for better debugging
            full_name = f"{prefix}{field_name}" if prefix else field_name

            if isinstance(field_value, SecretStr):
                # Direct SecretStr field
                secret_value = field_value.get_secret_value()
                if secret_value:
                    secrets[full_name] = secret_value
            elif isinstance(field_value, ConfigModel):
                # Nested ConfigModel - recurse into it
                field_value._collect_secrets(secrets, prefix=f"{full_name}.")
            elif isinstance(field_value, list):
                # Handle lists of ConfigModels
                for idx, item in enumerate(field_value):
                    if isinstance(item, ConfigModel):
                        item._collect_secrets(secrets, prefix=f"{full_name}[{idx}].")
            elif isinstance(field_value, dict):
                # Handle dicts with ConfigModel values
                for key, item in field_value.items():
                    if isinstance(item, ConfigModel):
                        item._collect_secrets(secrets, prefix=f"{full_name}[{key}].")

    @classmethod
    def parse_obj_allow_extras(cls, obj: Any) -> Self:
        """Parse an object while allowing extra fields.

        This method temporarily modifies the model's configuration to allow extra fields.

        TODO: Do we really need to support this behaviour? Consider removing this method in future.
        """
        try:
            with unittest.mock.patch.dict(
                cls.model_config,  # type: ignore
                {"extra": "allow"},
                clear=False,
            ):
                cls.model_rebuild(force=True)  # type: ignore
                return cls.model_validate(obj)
        finally:
            cls.model_rebuild(force=True)  # type: ignore


class PermissiveConfigModel(ConfigModel):
    # A permissive config model that allows extra fields.
    # This is useful for cases where we want to strongly type certain fields,
    # but still allow the user to pass in arbitrary fields that we don't care about.
    # It is usually used for argument bags that are passed through to third-party libraries.

    model_config = ConfigDict(extra="allow")


class ConnectionModel(BaseModel):
    """Represents the config associated with a connection"""

    model_config = ConfigDict(extra="allow", hide_input_in_errors=not get_debug())


class TransformerSemantics(ConfigEnum):
    """Describes semantics for aspect changes"""

    OVERWRITE = auto()  # Apply changes blindly
    PATCH = auto()  # Only apply differences from what exists already on the server


class TransformerSemanticsConfigModel(ConfigModel):
    semantics: TransformerSemantics = TransformerSemantics.OVERWRITE
    replace_existing: bool = False


class DynamicTypedConfig(ConfigModel):
    # Once support for discriminated unions gets merged into Pydantic, we can
    # simplify this configuration and validation.
    # See https://github.com/samuelcolvin/pydantic/pull/2336.

    type: str = Field(
        description="The type of the dynamic object",
    )
    # This config type is declared Optional[Any] here. The eventual parser for the
    # specified type is responsible for further validation.
    config: Optional[Any] = Field(
        default=None,
        description="The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
    )


# TODO: Many of these exception types are fairly specialized and shouldn't live in a common module.


class MetaError(Exception):
    """A base class for all meta exceptions."""


class PipelineExecutionError(MetaError):
    """An error occurred when executing the pipeline."""


class GraphError(MetaError):
    """An error in communicating with the DataHub Graph."""


class OperationalError(GraphError):
    """A GraphError with extra debug annotations."""

    message: str
    info: dict

    def __init__(self, message: str, info: Optional[dict] = None):
        self.message = message
        self.info = info or {}


class ConfigurationError(MetaError):
    """A configuration error."""


class IgnorableError(MetaError):
    """An error that can be ignored."""


class TraceTimeoutError(OperationalError):
    """Failure to complete an API Trace within the timeout."""


class TraceValidationError(OperationalError):
    """Failure to complete the expected write operation."""


@runtime_checkable
class ExceptionWithProps(Protocol):
    def get_telemetry_props(self) -> Dict[str, Any]: ...


def should_show_stack_trace(exc: Exception) -> bool:
    # Unless the exception is a ValidationError or explicitly opts out of stack traces,
    # we should show the stack trace.

    if isinstance(exc, ValidationError) or isinstance(exc.__cause__, ValidationError):
        return False

    return getattr(exc, "SHOW_STACK_TRACE", True)


class ConfigurationWarning(Warning):
    """A configuration warning."""


class ConfigurationMechanism(ABC):
    @abstractmethod
    def load_config(self, config_fp: IO) -> dict:
        pass


class AllowDenyPattern(ConfigModel):
    """A class to store allow deny regexes"""

    # This regex is used to check if a given rule is a regex expression or a literal.
    # Note that this is not a perfect check. For example, the '.' character should
    # be considered a regex special character, but it's used frequently in literal
    # patterns and hence we allow it anyway.
    IS_SIMPLE_REGEX: ClassVar = re.compile(r"^[A-Za-z0-9 _.-]+$")

    allow: List[str] = Field(
        default=[".*"],
        description="List of regex patterns to include in ingestion",
    )
    deny: List[str] = Field(
        default=[],
        description="List of regex patterns to exclude from ingestion.",
    )
    ignoreCase: Optional[bool] = Field(
        default=True,
        description="Whether to ignore case sensitivity during pattern matching.",
    )  # Name comparisons should default to ignoring case

    @property
    def regex_flags(self) -> int:
        return re.IGNORECASE if self.ignoreCase else 0

    @classmethod
    def allow_all(cls) -> "AllowDenyPattern":
        return AllowDenyPattern()

    def allowed(self, string: str) -> bool:
        if self.denied(string):
            return False

        return any(
            re.match(allow_pattern, string, self.regex_flags)
            for allow_pattern in self.allow
        )

    def denied(self, string: str) -> bool:
        for deny_pattern in self.deny:
            if re.match(deny_pattern, string, self.regex_flags):
                return True

        return False

    def is_fully_specified_allow_list(self) -> bool:
        """
        If the allow patterns are literals and not full regexes, then it is considered
        fully specified. This is useful if you want to convert a 'list + filter'
        pattern into a 'search for the ones that are allowed' pattern, which can be
        much more efficient in some cases.
        """
        return all(
            self.IS_SIMPLE_REGEX.match(allow_pattern) for allow_pattern in self.allow
        )

    def get_allowed_list(self) -> List[str]:
        """Return the list of allowed strings as a list, after taking into account deny patterns, if possible"""
        if not self.is_fully_specified_allow_list():
            raise ValueError(
                "allow list must be fully specified to get list of allowed strings"
            )
        return [a for a in self.allow if not self.denied(a)]

    def __eq__(self, other):  # type: ignore
        return isinstance(other, self.__class__) and self.__dict__ == other.__dict__


class KeyValuePattern(ConfigModel):
    """
    The key-value pattern is used to map a regex pattern to a set of values.
    For example, you can use it to map a table name to a list of tags to apply to it.
    """

    rules: Dict[str, List[str]] = {".*": []}
    first_match_only: bool = Field(
        default=True,
        description="Whether to stop after the first match. If false, all matching rules will be applied.",
    )

    @classmethod
    def all(cls) -> "KeyValuePattern":
        return KeyValuePattern()

    def value(self, string: str) -> List[str]:
        matching_keys = [key for key in self.rules if re.match(key, string)]
        if not matching_keys:
            return []
        elif self.first_match_only:
            return self.rules[matching_keys[0]]
        else:
            return deduplicate_list(
                [v for key in matching_keys for v in self.rules[key]]
            )


class VersionedConfig(ConfigModel):
    version: LaxStr = "1"
